Quay lại danh sách bài viết

Untitled

30 tháng 11, 2025
admin
Untitled
# 🛠️ Thực hành và tối ưu ## Bắt lỗi và log chi tiết khi thao tác SQL Server bằng Python ![Bắt lỗi và log chi tiết SQL Server](/img/blog/sql-error-logging.jpg) ## Giới thiệu Khi làm việc với hệ thống phân tích dữ liệu kết hợp Python và SQL Server, việc xử lý lỗi và ghi log chi tiết là một phần không thể thiếu để đảm bảo tính ổn định và khả năng bảo trì của ứng dụng. Một hệ thống ghi log được thiết kế tốt giúp phát hiện, phân tích và khắc phục lỗi nhanh chóng, đồng thời cung cấp thông tin quý giá về hiệu suất và hành vi của hệ thống. Bài viết này sẽ hướng dẫn chi tiết các kỹ thuật bắt lỗi và thiết lập hệ thống log hiệu quả khi thao tác với SQL Server từ Python. ## 1. Tổng quan về xử lý lỗi và logging trong Python ### 1.1. Các loại lỗi thường gặp khi làm việc với SQL Server Khi thao tác với SQL Server từ Python, chúng ta thường gặp các loại lỗi sau: 1. **Lỗi kết nối**: Không thể kết nối đến máy chủ SQL Server 2. **Lỗi xác thực**: Sai thông tin đăng nhập 3. **Lỗi cú pháp SQL**: Lỗi trong câu lệnh SQL 4. **Lỗi thời gian chờ**: Truy vấn mất quá nhiều thời gian để thực thi 5. **Lỗi ràng buộc dữ liệu**: Vi phạm các ràng buộc như khóa ngoại, giá trị duy nhất, v.v 6. **Lỗi chuyển đổi kiểu dữ liệu**: Không thể chuyển đổi dữ liệu giữa Python và SQL Server 7. **Lỗi tài nguyên**: Hết bộ nhớ, kết nối, v.v ### 1.2. Hệ thống log trong Python Python cung cấp module logging tiêu chuẩn giúp ghi log với nhiều cấp độ khác nhau: - **DEBUG**: Thông tin chi tiết, thường dùng khi gỡ lỗi - **INFO**: Xác nhận mọi thứ đang hoạt động như mong đợi - **WARNING**: Chỉ ra rằng có điều gì đó không mong muốn xảy ra, nhưng ứng dụng vẫn hoạt động - **ERROR**: Do lỗi, ứng dụng không thể thực hiện một số chức năng - **CRITICAL**: Lỗi nghiêm trọng, ứng dụng có thể không tiếp tục hoạt động ## 2. Thiết lập cơ bản cho logging ### 2.1. Thiết lập logging cơ bản ```python import logging # Cấu hình cơ bản logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', filename='sql_operations.log', filemode='a' # Append mode ) # Tạo logger logger = logging.getLogger('sql_server_operations') ``` ### 2.2. Cấu hình handlers đa dạng ```python import logging import os from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler def setup_logger(name, log_file, level=logging.INFO): """Thiết lập logger với file và console handlers""" # Tạo thư mục logs nếu chưa tồn tại log_dir = os.path.dirname(log_file) if log_dir and not os.path.exists(log_dir): os.makedirs(log_dir) # Tạo và cấu hình logger logger = logging.getLogger(name) logger.setLevel(level) # Ngăn log trùng lặp if logger.handlers: return logger # Tạo file handler sử dụng RotatingFileHandler file_handler = RotatingFileHandler( log_file, maxBytes=10*1024*1024, backupCount=5 ) file_handler.setLevel(level) # Tạo console handler console_handler = logging.StreamHandler() console_handler.setLevel(level) # Tạo formatter formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) file_handler.setFormatter(formatter) console_handler.setFormatter(formatter) # Thêm handlers vào logger logger.addHandler(file_handler) logger.addHandler(console_handler) return logger # Sử dụng sql_logger = setup_logger( 'sql_server_operations', os.path.join('logs', 'sql_operations.log') ) ``` ### 2.3. Sử dụng TimedRotatingFileHandler để phân chia log theo thời gian ```python def setup_timed_logger(name, log_file, level=logging.INFO): """Thiết lập logger với TimedRotatingFileHandler để phân chia log theo ngày""" logger = logging.getLogger(name) logger.setLevel(level) if logger.handlers: return logger # Tạo thư mục logs nếu chưa tồn tại log_dir = os.path.dirname(log_file) if log_dir and not os.path.exists(log_dir): os.makedirs(log_dir) # Tạo file handler sử dụng TimedRotatingFileHandler # Phân chia file log mỗi ngày, giữ lại 30 file file_handler = TimedRotatingFileHandler( log_file, when='midnight', interval=1, backupCount=30 ) file_handler.setLevel(level) # Tạo formatter bao gồm nhiều thông tin hơn formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(pathname)s:%(lineno)d - %(message)s' ) file_handler.setFormatter(formatter) logger.addHandler(file_handler) return logger # Sử dụng detailed_logger = setup_timed_logger( 'sql_detailed_operations', os.path.join('logs', 'sql_operations_detailed.log') ) ``` ## 3. Bắt và xử lý lỗi SQL Server ### 3.1. Xử lý lỗi cơ bản với try-except ```python import pyodbc import logging logger = logging.getLogger('sql_server_operations') def execute_query(conn_string, query, params=None): """Thực thi truy vấn SQL với xử lý lỗi cơ bản""" conn = None cursor = None try: # Thiết lập kết nối conn = pyodbc.connect(conn_string) cursor = conn.cursor() # Thực thi truy vấn logger.info(f"Thực thi truy vấn: {query[:100]}...") if params: cursor.execute(query, params) else: cursor.execute(query) # Commit nếu là truy vấn thay đổi dữ liệu if query.strip().upper().startswith(('INSERT', 'UPDATE', 'DELETE')): conn.commit() logger.info("Đã commit thay đổi") return cursor.rowcount # Trả về số hàng bị ảnh hưởng # Lấy kết quả nếu là truy vấn SELECT results = cursor.fetchall() logger.info(f"Truy vấn trả về {len(results)} kết quả") return results except pyodbc.Error as e: if conn: conn.rollback() logger.error(f"Lỗi SQL: {str(e)}") raise except Exception as e: if conn: conn.rollback() logger.error(f"Lỗi không xác định: {str(e)}") raise finally: # Đảm bảo đóng cursor và connection if cursor: cursor.close() if conn: conn.close() logger.debug("Đã đóng kết nối") ``` ### 3.2. Xử lý lỗi chi tiết theo từng loại lỗi ```python import pyodbc import time import logging from functools import wraps logger = logging.getLogger('sql_detailed_operations') # Định nghĩa các mã lỗi SQL Server phổ biến SQL_TIMEOUT_ERROR = '08S01' # Timeout SQL_CONNECTION_ERROR = '08001' # Không thể kết nối SQL_CONSTRAINT_VIOLATION = '23000' # Vi phạm ràng buộc def retry_on_connection_error(max_attempts=3, delay=2): """Decorator để thử lại khi gặp lỗi kết nối""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): attempts = 0 last_exception = None while attempts < max_attempts: try: return func(*args, **kwargs) except pyodbc.Error as e: error_code = e.args[0] if len(e.args) > 0 else "Unknown" # Chỉ thử lại với lỗi kết nối if error_code in (SQL_TIMEOUT_ERROR, SQL_CONNECTION_ERROR): attempts += 1 wait_time = delay * attempts # Tăng thời gian chờ theo số lần thử logger.warning( f"Lỗi kết nối (mã: {error_code}). " f"Thử lại lần {attempts}/{max_attempts} sau {wait_time} giây. " f"Chi tiết: {str(e)}" ) time.sleep(wait_time) last_exception = e else: # Với các lỗi khác thì ném ra ngay raise # Nếu đã thử hết số lần mà vẫn lỗi logger.error(f"Đã thử {max_attempts} lần nhưng vẫn thất bại: {str(last_exception)}") raise last_exception return wrapper return decorator class SQLServerError(Exception): """Lớp cơ sở cho các lỗi SQL Server tùy chỉnh""" def __init__(self, message, original_error=None, query=None, params=None): self.message = message self.original_error = original_error self.query = query self.params = params super().__init__(self.message) class SQLConnectionError(SQLServerError): """Lỗi kết nối đến SQL Server""" pass class SQLConstraintError(SQLServerError): """Lỗi vi phạm ràng buộc dữ liệu""" pass class SQLTimeoutError(SQLServerError): """Lỗi timeout khi thực thi truy vấn""" pass class SQLSyntaxError(SQLServerError): """Lỗi cú pháp SQL""" pass @retry_on_connection_error(max_attempts=3, delay=2) def execute_query_advanced(conn_string, query, params=None, timeout=30): """Thực thi truy vấn SQL với xử lý lỗi chi tiết""" conn = None cursor = None start_time = time.time() try: # Log thông tin truy vấn if params: masked_params = ['***' if i > 1 else str(p)[:10] for i, p in enumerate(params)] logger.info(f"Thực thi truy vấn với tham số: {query[:100]}... - Params: {masked_params}") else: logger.info(f"Thực thi truy vấn: {query[:100]}...") # Thiết lập kết nối conn = pyodbc.connect(conn_string, timeout=timeout) cursor = conn.cursor() # Thực thi truy vấn if params: cursor.execute(query, params) else: cursor.execute(query) # Đo thời gian thực thi execution_time = time.time() - start_time # Xác định loại truy vấn và xử lý kết quả phù hợp query_type = query.strip().upper().split()[0] if query.strip() else "" if query_type in ('INSERT', 'UPDATE', 'DELETE'): conn.commit() affected_rows = cursor.rowcount logger.info(f"Đã commit thay đổi. {affected_rows} hàng bị ảnh hưởng. " f"Thời gian thực thi: {execution_time:.3f}s") return affected_rows elif query_type == 'SELECT': columns = [column[0] for column in cursor.description] results = cursor.fetchall() row_count = len(results) logger.info(f"Truy vấn SELECT thành công. Trả về {row_count} kết quả. " f"Thời gian thực thi: {execution_time:.3f}s") # Log mẫu dữ liệu (chỉ log vài hàng đầu tiên để tránh quá tải) if row_count > 0 and logger.level <= logging.DEBUG: sample_data = str(results[0]) if len(sample_data) > 200: sample_data = sample_data[:200] + "..." logger.debug(f"Mẫu dữ liệu: {sample_data}") # Trả về kết quả dưới dạng list of dict để dễ sử dụng return [dict(zip(columns, row)) for row in results] else: conn.commit() logger.info(f"Đã thực thi truy vấn. Thời gian thực thi: {execution_time:.3f}s") return True except pyodbc.Error as e: # Rollback transaction nếu có lỗi if conn: try: conn.rollback() logger.info("Đã rollback transaction") except Exception: pass # Xác định mã lỗi error_code = e.args[0] if len(e.args) > 0 else "Unknown" error_message = str(e) # Ghi log và ném ra exception tùy chỉnh tương ứng if error_code == SQL_CONNECTION_ERROR: logger.error(f"Lỗi kết nối SQL Server: {error_message}") raise SQLConnectionError("Không thể kết nối đến SQL Server", e, query, params) elif error_code == SQL_TIMEOUT_ERROR: logger.error(f"Lỗi timeout SQL: {error_message}") raise SQLTimeoutError("Truy vấn bị timeout", e, query, params) elif error_code == SQL_CONSTRAINT_VIOLATION: logger.error(f"Lỗi vi phạm ràng buộc dữ liệu: {error_message}") raise SQLConstraintError("Vi phạm ràng buộc dữ liệu", e, query, params) elif 'syntax' in error_message.lower(): logger.error(f"Lỗi cú pháp SQL: {error_message}") raise SQLSyntaxError("Lỗi cú pháp trong truy vấn SQL", e, query, params) else: logger.error(f"Lỗi SQL không xác định (mã: {error_code}): {error_message}") raise SQLServerError(f"Lỗi SQL Server: {error_message}", e, query, params) except Exception as e: # Xử lý các lỗi khác không phải từ SQL Server if conn: try: conn.rollback() logger.info("Đã rollback transaction") except Exception: pass logger.error(f"Lỗi không xác định: {str(e)}", exc_info=True) raise finally: # Đảm bảo đóng cursor và connection if cursor: cursor.close() if conn: conn.close() logger.debug("Đã đóng kết nối DB") ``` ## 4. Thiết kế hệ thống log toàn diện ### 4.1. Tạo lớp Logger tùy chỉnh ```python import logging import os import json import traceback import socket from datetime import datetime from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler class SQLLogger: """Lớp logger tùy chỉnh cho các thao tác với SQL Server""" def __init__(self, app_name, log_dir='logs', log_level=logging.INFO): """Khởi tạo hệ thống log""" self.app_name = app_name self.log_dir = log_dir self.log_level = log_level self.hostname = socket.gethostname() # Tạo thư mục logs nếu chưa tồn tại if not os.path.exists(log_dir): os.makedirs(log_dir) # Thiết lập các logger self.setup_loggers() def setup_loggers(self): """Thiết lập các logger khác nhau cho từng mục đích""" # Logger cho các thao tác SQL thông thường self.sql_logger = self._create_logger( 'sql_operations', os.path.join(self.log_dir, 'sql_operations.log'), self.log_level ) # Logger chi tiết cho lỗi self.error_logger = self._create_logger( 'sql_errors', os.path.join(self.log_dir, 'sql_errors.log'), logging.ERROR ) # Logger cho các truy vấn chậm self.slow_query_logger = self._create_logger( 'slow_queries', os.path.join(self.log_dir, 'slow_queries.log'), logging.WARNING ) def _create_logger(self, name, log_file, level): """Tạo logger với file handler và formatter""" # Tạo logger với tên ứng dụng làm prefix logger_name = f"{self.app_name}.{name}" logger = logging.getLogger(logger_name) logger.setLevel(level) # Ngăn log trùng lặp if logger.handlers: return logger # Tạo file handler với rotation file_handler = TimedRotatingFileHandler( log_file, when='midnight', interval=1, backupCount=30 ) file_handler.setLevel(level) # Tạo formatter chi tiết formatter = logging.Formatter( '%(asctime)s - %(levelname)s - %(name)s - [%(hostname)s] - ' '%(pathname)s:%(lineno)d - %(message)s' ) # Thêm thông tin hostname vào formatter old_factory = logging.getLogRecordFactory() def record_factory(*args, **kwargs): record = old_factory(*args, **kwargs) record.hostname = self.hostname return record logging.setLogRecordFactory(record_factory) file_handler.setFormatter(formatter) logger.addHandler(file_handler) return logger def log_query(self, query, params=None, duration=None, result_count=None): """Ghi log truy vấn SQL""" # Tạo thông tin log log_data = { 'query': query[:500] + ('...' if len(query) > 500 else ''), 'timestamp': datetime.now().isoformat(), 'hostname': self.hostname } # Thêm params nếu có (che dấu thông tin nhạy cảm) if params: masked_params = [] for p in params: if isinstance(p, str) and len(p) > 10: masked_params.append(p[:5] + '...' + p[-2:]) else: masked_params.append(p) log_data['params'] = masked_params # Thêm thời gian thực thi nếu có if duration: log_data['duration'] = f"{duration:.3f}s" # Log truy vấn chậm (>1s) vào logger riêng if duration > 1.0: self.slow_query_logger.warning( f"Truy vấn chậm: {log_data['query']} - " f"Thời gian: {log_data['duration']}" ) # Thêm số lượng kết quả nếu có if result_count is not None: log_data['result_count'] = result_count # Ghi log thông thường self.sql_logger.info(f"SQL Query: {json.dumps(log_data)}") return log_data def log_error(self, error, query=None, params=None, context=None): """Ghi log lỗi SQL với thông tin chi tiết""" error_type = type(error).__name__ error_message = str(error) stack_trace = traceback.format_exc() # Tạo thông tin log error_data = { 'error_type': error_type, 'error_message': error_message, 'timestamp': datetime.now().isoformat(), 'hostname': self.hostname, 'stack_trace': stack_trace } # Thêm query nếu có if query: error_data['query'] = query[:500] + ('...' if len(query) > 500 else '') # Thêm params nếu có (che dấu thông tin nhạy cảm) if params: masked_params = [] for p in params: if isinstance(p, str) and len(p) > 10: masked_params.append(p[:5] + '...' + p[-2:]) else: masked_params.append(p) error_data['params'] = masked_params # Thêm context nếu có if context: error_data['context'] = context # Ghi log lỗi self.error_logger.error(f"SQL Error: {json.dumps(error_data)}") # Đồng thời ghi log thông thường self.sql_logger.error( f"SQL Error: {error_type} - {error_message}" ) return error_data def log_transaction(self, action, affected_rows=None, duration=None): """Ghi log các thao tác transaction""" log_data = { 'action': action, 'timestamp': datetime.now().isoformat(), 'hostname': self.hostname } if affected_rows is not None: log_data['affected_rows'] = affected_rows if duration: log_data['duration'] = f"{duration:.3f}s" self.sql_logger.info(f"Transaction: {json.dumps(log_data)}") return log_data ``` ### 4.2. Tích hợp logger tùy chỉnh với thao tác SQL ```python import pyodbc import time from contextlib import contextmanager class SQLServerDatabase: """Lớp quản lý kết nối và thao tác với SQL Server kèm logging""" def __init__(self, conn_string, app_name="SQLApp", log_dir="logs"): """Khởi tạo với chuỗi kết nối và thiết lập logger""" self.conn_string = conn_string self.logger = SQLLogger(app_name, log_dir) @contextmanager def connection(self): """Context manager để quản lý kết nối tự động đóng""" conn = None try: conn = pyodbc.connect(self.conn_string) yield conn except pyodbc.Error as e: self.logger.log_error(e, context="establishing connection") raise finally: if conn: conn.close() @contextmanager def transaction(self): """Context manager để quản lý transaction""" with self.connection() as conn: try: # Bắt đầu transaction start_time = time.time() self.logger.log_transaction("START") yield conn # Commit transaction nếu không có lỗi conn.commit() duration = time.time() - start_time self.logger.log_transaction("COMMIT", duration=duration) except Exception as e: # Rollback transaction nếu có lỗi conn.rollback() duration = time.time() - start_time self.logger.log_transaction("ROLLBACK", duration=duration) # Log lỗi self.logger.log_error(e, context="transaction") raise def execute_query(self, query, params=None): """Thực thi truy vấn và trả về kết quả""" with self.connection() as conn: try: start_time = time.time() cursor = conn.cursor() # Thực thi truy vấn if params: cursor.execute(query, params) else: cursor.execute(query) # Lấy kết quả nếu là truy vấn SELECT if query.strip().upper().startswith("SELECT"): columns = [column[0] for column in cursor.description] results = cursor.fetchall() result_count = len(results) # Tính thời gian thực thi duration = time.time() - start_time # Log truy vấn self.logger.log_query( query, params, duration=duration, result_count=result_count ) # Trả về kết quả dưới dạng list of dict return [dict(zip(columns, row)) for row in results] else: # Đối với các truy vấn thay đổi dữ liệu affected_rows = cursor.rowcount # Tính thời gian thực thi duration = time.time() - start_time # Log truy vấn self.logger.log_query( query, params, duration=duration, result_count=affected_rows ) # Commit thay đổi conn.commit() # Trả về số hàng bị ảnh hưởng return affected_rows except Exception as e: # Log lỗi self.logger.log_error(e, query, params) raise def execute_many(self, query, params_list): """Thực thi nhiều truy vấn với danh sách tham số""" with self.transaction() as conn: try: start_time = time.time() cursor = conn.cursor() # Thực thi executemany cursor.executemany(query, params_list) # Tính thời gian thực thi duration = time.time() - start_time # Lấy số hàng bị ảnh hưởng affected_rows = cursor.rowcount # Log thông tin self.logger.log_query( query, f"[{len(params_list)} parameter sets]", duration=duration, result_count=affected_rows ) return affected_rows except Exception as e: # Log lỗi self.logger.log_error(e, query, f"[{len(params_list)} parameter sets]") raise def bulk_insert(self, table_name, data, batch_size=1000): """Thực hiện bulk insert với logging chi tiết""" if not data: return 0 total_rows = len(data) total_batches = (total_rows + batch_size - 1) // batch_size total_inserted = 0 start_total_time = time.time() self.logger.sql_logger.info( f"Bắt đầu bulk insert vào bảng {table_name}. " f"{total_rows} hàng, {total_batches} batch(es)" ) try: # Lấy tên các cột từ dữ liệu if isinstance(data[0], dict): columns = list(data[0].keys()) # Chuyển dữ liệu từ dict sang list data_values = [[row[col] for col in columns] for row in data] else: raise ValueError("Data phải là list of dict") # Xây dựng câu truy vấn insert placeholders = ','.join('?' for _ in columns) column_names = ','.join(columns) query = f"INSERT INTO {table_name} ({column_names}) VALUES ({placeholders})" # Thực hiện insert theo batch for i in range(0, total_rows, batch_size): batch_start_time = time.time() # Lấy một batch dữ liệu batch = data_values[i:i+batch_size] batch_size_actual = len(batch) # Thực hiện insert with self.transaction() as conn: cursor = conn.cursor() cursor.executemany(query, batch) batch_affected = cursor.rowcount # Tính thời gian và log batch_duration = time.time() - batch_start_time total_inserted += batch_affected self.logger.sql_logger.info( f"Batch {(i//batch_size)+1}/{total_batches}: " f"Đã insert {batch_affected}/{batch_size_actual} hàng " f"trong {batch_duration:.3f}s" ) # Log tổng kết total_duration = time.time() - start_total_time self.logger.sql_logger.info( f"Hoàn thành bulk insert vào bảng {table_name}. " f"Tổng số: {total_inserted}/{total_rows} hàng " f"trong {total_duration:.3f}s" ) return total_inserted except Exception as e: # Log lỗi self.logger.log_error( e, context=f"bulk_insert to {table_name}, {total_rows} rows" ) raise ``` ## 5. Ứng dụng thực tế ### 5.1. Ví dụ sử dụng lớp Database với xử lý lỗi ```python # Ví dụ sử dụng lớp Database để thao tác với SQL Server conn_string = 'DRIVER={SQL Server};SERVER=your_server;DATABASE=your_db;UID=your_user;PWD=your_password' # Khởi tạo đối tượng Database db = SQLServerDatabase(conn_string, app_name="SalesAnalytics", log_dir="logs/sales") try: # Truy vấn đơn giản results = db.execute_query( "SELECT TOP 10 * FROM DuLieuBanHang WHERE NgayBan >= ?", ['2024-01-01'] ) print(f"Lấy được {len(results)} kết quả") # Thực hiện insert với transaction with db.transaction() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO DuLieuBanHang (NgayBan, MaSanPham, SoLuong, DoanhThu) VALUES (?, ?, ?, ?) """, ['2024-05-01', 'SP001', 5, 1500000]) # Có thể thực hiện nhiều lệnh trong cùng một transaction cursor.execute(""" UPDATE TongKetDoanhThu SET TongDoanhThu = TongDoanhThu + ? WHERE Thang = 5 AND Nam = 2024 """, [1500000]) # Bulk insert dữ liệu sales_data = [ { 'NgayBan': '2024-05-01', 'MaSanPham': f'SP{i:03d}', 'SoLuong': i % 10 + 1, 'DoanhThu': (i % 10 + 1) * 300000 } for i in range(1, 101) ] inserted = db.bulk_insert('DuLieuBanHang', sales_data, batch_size=20) print(f"Đã insert {inserted} hàng dữ liệu") except SQLConnectionError as e: print(f"Lỗi kết nối: {e.message}") # Thử kết nối lại hoặc thông báo cho người dùng except SQLConstraintError as e: print(f"Lỗi ràng buộc dữ liệu: {e.message}") # Kiểm tra và sửa dữ liệu except SQLTimeoutError as e: print(f"Truy vấn bị timeout: {e.message}") # Tối ưu truy vấn hoặc tăng timeout except SQLSyntaxError as e: print(f"Lỗi cú pháp SQL: {e.message}") # Sửa lỗi cú pháp trong truy vấn except SQLServerError as e: print(f"Lỗi SQL Server: {e.message}") # Xử lý lỗi chung từ SQL Server except Exception as e: print(f"Lỗi không xác định: {str(e)}") # Ghi log và thông báo lỗi chung ``` ### 5.2. Xử lý lỗi khi làm việc với pandas và SQLAlchemy ```python import pandas as pd from sqlalchemy import create_engine, text import urllib import logging # Thiết lập logger logger = logging.getLogger('pandas_sql') logger.setLevel(logging.INFO) handler = logging.FileHandler('logs/pandas_sql.log') handler.setFormatter(logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' )) logger.addHandler(handler) def create_sql_engine(server, database, username, password): """Tạo SQLAlchemy engine với xử lý lỗi""" try: # Tạo chuỗi kết nối params = urllib.parse.quote_plus( f"DRIVER={{SQL Server}};SERVER={server};" f"DATABASE={database};UID={username};PWD={password}" ) # Tạo engine với cấu hình engine = create_engine( f"mssql+pyodbc:///?odbc_connect={params}", pool_pre_ping=True, # Kiểm tra kết nối trước khi sử dụng pool_recycle=3600, # Làm mới kết nối sau 1 giờ connect_args={'timeout': 30} # Timeout kết nối là 30 giây ) # Kiểm tra kết nối with engine.connect() as conn: result = conn.execute(text("SELECT 1")) if result.scalar() == 1: logger.info(f"Kết nối thành công đến {server}/{database}") return engine else: raise Exception("Kiểm tra kết nối không thành công") except Exception as e: logger.error(f"Lỗi tạo kết nối SQL: {str(e)}") raise def read_sql_with_logging(query, engine, params=None): """Đọc dữ liệu từ SQL với pandas và log""" try: start_time = time.time() logger.info(f"Bắt đầu đọc dữ liệu với query: {query[:200]}...") # Thực hiện truy vấn if params: df = pd.read_sql(query, engine, params=params) else: df = pd.read_sql(query, engine) # Log kết quả duration = time.time() - start_time row_count = len(df) col_count = len(df.columns) logger.info( f"Hoàn thành đọc dữ liệu: {row_count} hàng × {col_count} cột " f"trong {duration:.3f}s" ) # Log thông tin về bộ nhớ sử dụng memory_usage = df.memory_usage(deep=True).sum() logger.info(f"Bộ nhớ sử dụng: {memory_usage/1024/1024:.2f} MB") return df except Exception as e: logger.error(f"Lỗi đọc dữ liệu SQL: {str(e)}") raise def write_to_sql_with_logging(df, table_name, engine, if_exists='replace', chunksize=1000): """Ghi DataFrame vào SQL Server với logging""" try: start_time = time.time() row_count = len(df) logger.info( f"Bắt đầu ghi {row_count} hàng vào bảng {table_name}, " f"chế độ: {if_exists}, kích thước chunk: {chunksize}" ) # Thực hiện ghi dữ liệu df.to_sql( table_name, engine, if_exists=if_exists, chunksize=chunksize, index=False ) # Log kết quả duration = time.time() - start_time logger.info( f"Hoàn thành ghi dữ liệu vào bảng {table_name}: " f"{row_count} hàng trong {duration:.3f}s" ) return True except Exception as e: logger.error(f"Lỗi ghi dữ liệu vào SQL: {str(e)}") raise ``` ### 5.3. Tích hợp với hệ thống giám sát ```python import requests import socket import json import traceback from datetime import datetime class SQLMonitor: """Lớp giám sát SQL Server và gửi thông báo khi có lỗi""" def __init__(self, webhook_url=None, email_config=None): """Khởi tạo với URL webhook và cấu hình email""" self.webhook_url = webhook_url self.email_config = email_config self.hostname = socket.gethostname() self.logger = logging.getLogger('sql_monitor') # Thiết lập handler cho logger handler = logging.FileHandler('logs/sql_monitor.log') handler.setFormatter(logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' )) self.logger.addHandler(handler) self.logger.setLevel(logging.INFO) def log_and_alert(self, error, query=None, context=None, alert_level='warning'): """Ghi log lỗi và gửi thông báo""" # Tạo thông tin lỗi error_data = { 'timestamp': datetime.now().isoformat(), 'hostname': self.hostname, 'error_type': type(error).__name__, 'error_message': str(error), 'stack_trace': traceback.format_exc(), 'alert_level': alert_level } if query: error_data['query'] = query if context: error_data['context'] = context # Ghi log if alert_level == 'critical': self.logger.critical(f"SQL Critical Error: {json.dumps(error_data)}") elif alert_level == 'error': self.logger.error(f"SQL Error: {json.dumps(error_data)}") else: self.logger.warning(f"SQL Warning: {json.dumps(error_data)}") # Gửi thông báo if alert_level in ('error', 'critical'): self._send_alert(error_data) def _send_alert(self, error_data): """Gửi thông báo lỗi qua webhook và/hoặc email""" # Gửi qua webhook (ví dụ: Slack, Teams, etc.) if self.webhook_url: try: # Định dạng thông báo message = { 'text': f"SQL Error on {error_data['hostname']}", 'attachments': [{ 'title': f"{error_data['error_type']}: {error_data['error_message']}", 'text': f"Context: {error_data.get('context', 'N/A')}\n" f"Query: {error_data.get('query', 'N/A')}\n" f"Time: {error_data['timestamp']}", 'color': 'danger' if error_data['alert_level'] == 'critical' else 'warning' }] } # Gửi request response = requests.post( self.webhook_url, json=message, timeout=5 ) if response.status_code == 200: self.logger.info("Đã gửi thông báo lỗi qua webhook") else: self.logger.warning( f"Không thể gửi thông báo qua webhook. " f"Status code: {response.status_code}" ) except Exception as e: self.logger.error(f"Lỗi khi gửi thông báo webhook: {str(e)}") # Gửi qua email if self.email_config: try: import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart # Tạo email msg = MIMEMultipart() msg['From'] = self.email_config['from'] msg['To'] = self.email_config['to'] msg['Subject'] = f"SQL Error on {error_data['hostname']}: {error_data['error_type']}" # Tạo nội dung body = f""" <h2>SQL Error Details</h2> <p><strong>Time:</strong> {error_data['timestamp']}</p> <p><strong>Host:</strong> {error_data['hostname']}</p> <p><strong>Error Type:</strong> {error_data['error_type']}</p> <p><strong>Error Message:</strong> {error_data['error_message']}</p> <h3>Context</h3> <p>{error_data.get('context', 'N/A')}</p> <h3>Query</h3> <pre>{error_data.get('query', 'N/A')}</pre> <h3>Stack Trace</h3> <pre>{error_data['stack_trace']}</pre> """ msg.attach(MIMEText(body, 'html')) # Gửi email server = smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port']) server.starttls() server.login(self.email_config['username'], self.email_config['password']) server.send_message(msg) server.quit() self.logger.info("Đã gửi thông báo lỗi qua email") except Exception as e: self.logger.error(f"Lỗi khi gửi email thông báo: {str(e)}") ``` ### 5.4. Hệ thống theo dõi hiệu suất truy vấn SQL ```python class SQLPerformanceTracker: """Lớp theo dõi hiệu suất truy vấn SQL""" def __init__(self, log_dir='logs/performance'): """Khởi tạo với thư mục log""" self.log_dir = log_dir # Tạo thư mục nếu chưa tồn tại if not os.path.exists(log_dir): os.makedirs(log_dir) # Thiết lập logger self.logger = logging.getLogger('sql_performance') handler = logging.FileHandler(os.path.join(log_dir, 'performance.log')) handler.setFormatter(logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' )) self.logger.addHandler(handler) self.logger.setLevel(logging.INFO) # Thống kê hiệu suất self.stats = { 'total_queries': 0, 'total_duration': 0, 'max_duration': 0, 'slow_queries': 0, 'error_queries': 0, 'query_types': { 'SELECT': 0, 'INSERT': 0, 'UPDATE': 0, 'DELETE': 0, 'OTHER': 0 } } def track_query(self, query, duration, result_count=None, error=None): """Theo dõi một truy vấn SQL""" try: # Cập nhật thống kê self.stats['total_queries'] += 1 self.stats['total_duration'] += duration self.stats['max_duration'] = max(self.stats['max_duration'], duration) if duration > 1.0: # Truy vấn chậm > 1 giây self.stats['slow_queries'] += 1 if error: self.stats['error_queries'] += 1 # Xác định loại truy vấn first_word = query.strip().upper().split()[0] if query.strip() else "OTHER" if first_word in self.stats['query_types']: self.stats['query_types'][first_word] += 1 else: self.stats['query_types']['OTHER'] += 1 # Log thông tin truy vấn log_entry = { 'timestamp': datetime.now().isoformat(), 'query_type': first_word, 'duration': duration, 'result_count': result_count, 'has_error': error is not None, 'query_preview': query[:100] + ('...' if len(query) > 100 else '') } self.logger.info(f"Query Stats: {json.dumps(log_entry)}") # Log chi tiết cho truy vấn chậm if duration > 1.0: slow_log_entry = { 'timestamp': datetime.now().isoformat(), 'duration': duration, 'query': query, 'result_count': result_count } with open(os.path.join(self.log_dir, 'slow_queries.log'), 'a') as f: f.write(f"{json.dumps(slow_log_entry)}\n") return log_entry except Exception as e: print(f"Lỗi khi theo dõi truy vấn: {str(e)}") def get_statistics(self): """Lấy thống kê hiệu suất""" stats = self.stats.copy() # Tính thời gian trung bình if stats['total_queries'] > 0: stats['avg_duration'] = stats['total_duration'] / stats['total_queries'] else: stats['avg_duration'] = 0 return stats def generate_report(self, output_file=None): """Tạo báo cáo hiệu suất""" stats = self.get_statistics() report = f""" SQL Performance Report - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ============================================================= Summary: -------- Total Queries: {stats['total_queries']} Average Duration: {stats['avg_duration']:.6f} seconds Maximum Duration: {stats['max_duration']:.6f} seconds Slow Queries (>1s): {stats['slow_queries']} ({stats['slow_queries']/max(1, stats['total_queries'])*100:.2f}%) Error Queries: {stats['error_queries']} ({stats['error_queries']/max(1, stats['total_queries'])*100:.2f}%) Query Types: ------------ SELECT: {stats['query_types']['SELECT']} ({stats['query_types']['SELECT']/max(1, stats['total_queries'])*100:.2f}%) INSERT: {stats['query_types']['INSERT']} ({stats['query_types']['INSERT']/max(1, stats['total_queries'])*100:.2f}%) UPDATE: {stats['query_types']['UPDATE']} ({stats['query_types']['UPDATE']/max(1, stats['total_queries'])*100:.2f}%) DELETE: {stats['query_types']['DELETE']} ({stats['query_types']['DELETE']/max(1, stats['total_queries'])*100:.2f}%) OTHER: {stats['query_types']['OTHER']} ({stats['query_types']['OTHER']/max(1, stats['total_queries'])*100:.2f}%) """ if output_file: with open(output_file, 'w') as f: f.write(report) return report ``` ## 6. Tích hợp vào hệ thống trực tiếp ### 6.1. Tạo decorators cho xử lý lỗi tự động ```python from functools import wraps import time import traceback def log_sql_operation(logger): """Decorator để log các thao tác SQL""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): # Lấy tên hàm func_name = func.__name__ start_time = time.time() try: # Thực thi hàm logger.info(f"Bắt đầu thực thi {func_name}") result = func(*args, **kwargs) # Tính thời gian thực thi duration = time.time() - start_time # Log kết quả thành công logger.info(f"Thực thi {func_name} thành công trong {duration:.3f}s") return result except Exception as e: # Tính thời gian thực thi duration = time.time() - start_time # Log lỗi error_type = type(e).__name__ error_message = str(e) stack_trace = traceback.format_exc() logger.error( f"Lỗi khi thực thi {func_name}: {error_type} - {error_message}\n" f"Thời gian: {duration:.3f}s\n" f"Stack trace: {stack_trace}" ) # Ném lại exception raise return wrapper return decorator def retry_on_specific_errors(max_attempts=3, delay=2, error_types=(Exception,)): """Decorator để thử lại khi gặp lỗi cụ thể""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): attempts = 0 last_exception = None while attempts < max_attempts: try: return func(*args, **kwargs) except error_types as e: attempts += 1 last_exception = e # Tăng thời gian chờ theo số lần thử wait_time = delay * attempts if attempts < max_attempts: logging.warning( f"Lỗi khi thực thi {func.__name__}: {str(e)}\n" f"Thử lại lần {attempts}/{max_attempts} sau {wait_time} giây" ) time.sleep(wait_time) else: logging.error( f"Đã thử lại {max_attempts} lần nhưng vẫn thất bại: {str(e)}" ) # Nếu đã thử hết số lần mà vẫn lỗi raise last_exception return wrapper return decorator ``` ### 6.2. Tích hợp với FastAPI hoặc Flask ```python from fastapi import FastAPI, HTTPException, Depends from fastapi.responses import JSONResponse from pydantic import BaseModel import logging import time # Thiết lập logger logger = logging.getLogger("api_sql_operations") logger.setLevel(logging.INFO) handler = logging.FileHandler("logs/api_sql.log") handler.setFormatter(logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' )) logger.addHandler(handler) # Khởi tạo FastAPI app app = FastAPI(title="SQL Operations API") # Khởi tạo đối tượng Database db = SQLServerDatabase( conn_string='DRIVER={SQL Server};SERVER=your_server;DATABASE=your_db;UID=your_user;PWD=your_password', app_name="API_Service", log_dir="logs/api" ) # Model cho request class DataQuery(BaseModel): query: str params: list = None # Middleware để log request và response @app.middleware("http") async def log_requests(request, call_next): start_time = time.time() # Log request logger.info(f"Request: {request.method} {request.url}") try: # Xử lý request response = await call_next(request) # Tính thời gian xử lý duration = time.time() - start_time # Log response logger.info(f"Response: {response.status_code} in {duration:.3f}s") return response except Exception as e: # Log lỗi duration = time.time() - start_time logger.error(f"Error: {str(e)} in {duration:.3f}s") # Trả về lỗi 500 return JSONResponse( status_code=500, content={"detail": "Internal Server Error"} ) # Endpoint để thực thi truy vấn SELECT @app.post("/query/select") async def execute_select(query_data: DataQuery): try: # Log truy vấn logger.info(f"Executing SELECT query: {query_data.query[:100]}...") # Kiểm tra xem có phải truy vấn SELECT không if not query_data.query.strip().upper().startswith("SELECT"): raise HTTPException( status_code=400, detail="Only SELECT queries are allowed for this endpoint" ) # Thực thi truy vấn start_time = time.time() results = db.execute_query(query_data.query, query_data.params) duration = time.time() - start_time # Log kết quả logger.info(f"Query executed in {duration:.3f}s, returned {len(results)} rows") return { "success": True, "duration": duration, "row_count": len(results), "results": results } except SQLServerError as e: # Log lỗi logger.error(f"SQL Error: {e.message}") # Trả về lỗi raise HTTPException( status_code=400, detail=f"SQL Error: {e.message}" ) except Exception as e: # Log lỗi không xác định logger.error(f"Unexpected error: {str(e)}") # Trả về lỗi 500 raise HTTPException( status_code=500, detail="Internal server error" ) ``` ## Kết luận ![Bắt lỗi và log chi tiết SQL Server](/img/blog/sql-error-logging.jpg) Bắt lỗi và log chi tiết khi thao tác SQL Server bằng Python là một phần không thể thiếu trong việc xây dựng các ứng dụng dữ liệu chuyên nghiệp, đáng tin cậy. Một hệ thống xử lý lỗi và ghi log tốt không chỉ giúp phát hiện và khắc phục sự cố nhanh chóng mà còn cung cấp thông tin quý giá để tối ưu hóa hiệu suất và độ tin cậy của hệ thống. Các nguyên tắc quan trọng cần nhớ: 1. **Luôn xử lý lỗi một cách cụ thể**: Phân loại và xử lý từng loại lỗi riêng biệt thay vì bắt tất cả các lỗi cùng một cách. 2. **Ghi log có cấu trúc và chi tiết**: Bao gồm thông tin về thời gian, ngữ cảnh, truy vấn và tham số để dễ dàng phân tích. 3. **Sử dụng các cấp độ log phù hợp**: DEBUG, INFO, WARNING, ERROR, CRITICAL cho từng loại thông tin khác nhau. 4. **Áp dụng log rotation**: Ngăn chặn các file log quá lớn và khó quản lý. 5. **Tích hợp với hệ thống giám sát**: Gửi thông báo khi có lỗi nghiêm trọng để xử lý kịp thời. 6. **Theo dõi hiệu suất truy vấn**: Phát hiện và tối ưu các truy vấn chậm. Với các kỹ thuật và công cụ được trình bày trong bài viết này, bạn có thể xây dựng một hệ thống logging và xử lý lỗi toàn diện, giúp ứng dụng của bạn trở nên ổn định, dễ bảo trì và hiệu quả hơn khi làm việc với SQL Server từ Python.
Chia sẻ:

Bài viết liên quan

Không có bài viết liên quan.